/*
 *    Copyright 2022 The DSMS Authors.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

package com.dsms.modules.node.task;

import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.dsms.common.constant.*;
import com.dsms.common.exception.DsmsEngineException;
import com.dsms.common.remotecall.model.FailedDetail;
import com.dsms.common.remotecall.model.FinishedDetail;
import com.dsms.common.remotecall.model.RemoteResponse;
import com.dsms.common.taskmanager.TaskException;
import com.dsms.common.taskmanager.TaskStrategy;
import com.dsms.common.taskmanager.model.Step;
import com.dsms.common.taskmanager.model.Task;
import com.dsms.common.taskmanager.service.IStepService;
import com.dsms.common.taskmanager.service.ITaskService;
import com.dsms.dfsbroker.node.api.NodeApi;
import com.dsms.dfsbroker.node.request.OsdAddRequest;
import com.dsms.dfsbroker.osd.crushmap.api.CrushmapApi;
import com.dsms.dfsbroker.osd.crushmap.model.remote.CrushmapBucket;
import com.dsms.dfsbroker.osd.crushmap.request.CreateBucketRequest;
import com.dsms.dfsbroker.osd.osd.api.OsdApi;
import com.dsms.dfsbroker.osd.osd.model.remote.OSDStatusResult;
import com.dsms.dfsbroker.osd.osd.request.UpdateOsdBucketRequest;
import com.dsms.modules.node.model.dto.NodeDeviceManageDto;
import com.dsms.modules.util.RemoteCallUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.commons.lang3.ThreadUtils.sleep;


@Slf4j
@Service(TaskTypeEnum.TypeConstants.ADD_OSD)
public class AddOsdTask implements TaskStrategy {

    private final OsdApi osdApi;

    private final NodeApi nodeApi;

    private final CrushmapApi crushmapApi;

    private final ITaskService taskService;

    private final IStepService stepService;

    private final RemoveOsdTask removeOsdTask;

    @Autowired
    public AddOsdTask(OsdApi osdApi, NodeApi nodeApi, CrushmapApi crushmapApi, ITaskService taskService, IStepService stepService, RemoveOsdTask removeOsdTask) {
        this.osdApi = osdApi;
        this.nodeApi = nodeApi;
        this.crushmapApi = crushmapApi;
        this.taskService = taskService;
        this.stepService = stepService;
        this.removeOsdTask = removeOsdTask;
    }

    @Override
    public Task execute(Task task) {
        String taskParam = task.getTaskParam();
        if (ObjectUtils.isEmpty(taskParam)) {
            task.setTaskEndTime(LocalDateTime.now());
            task.setTaskStatus(TaskStatusEnum.FAIL.getStatus());
            return task;
        }

        NodeDeviceManageDto nodeDeviceManageDto = JSONUtil.toBean(taskParam, NodeDeviceManageDto.class);
        Integer taskId = task.getId();
        Integer osdId = null;
        List<Step> steps = stepService.list(new LambdaQueryWrapper<Step>().eq(Step::getTaskId, taskId));
        try {
            //1.create the osd
            Step createStep = createOsd(nodeDeviceManageDto,
                    steps.stream().filter(s -> s.getStepType().equals(StepTypeEnum.CREATE_OSD.getType())).findFirst().orElse(new Step()));

            //2.get the osdId according to success result
            String successMessage = createStep.getSuccessMessage();
            Pattern pattern = Pattern.compile(OsdAddRequest.ADD_OSD_SUCCESS_PATTERN);
            Matcher matcher = pattern.matcher(successMessage.trim());
            if (!matcher.matches()) {
                throw new DsmsEngineException(ResultCode.NODE_MOVE_OSD_TO_DEFAULT_ERROR);
            }
            osdId = Integer.parseInt(matcher.group(1));
            nodeDeviceManageDto.setOsdId(osdId);

            //3.move the osd to default location in crushmap
            Step moveStep = moveOsd(nodeDeviceManageDto,
                    steps.stream().filter(s -> s.getStepType().equals(StepTypeEnum.MOVE_OSD_TO_DEFAULT.getType())).findFirst().orElse(new Step()));
        } catch (Exception e) {
            log.error("create osd task fail, fail reason:{}", e.getMessage(), e);
            if (ObjectUtils.isEmpty(task.getTaskErrorMessage())) {
                task.setTaskErrorMessage(e.getMessage());
            }
            nodeDeviceManageDto.setOsdId(osdId);
            task.setTaskParam(JSONUtil.toJsonStr(nodeDeviceManageDto));
            task.setTaskEndTime(LocalDateTime.now());
            task.setTaskStatus(TaskStatusEnum.ROLLBACK.getStatus());
            return task;
        }
        task.setTaskEndTime(LocalDateTime.now());
        task.setTaskStatus(TaskStatusEnum.FINISH.getStatus());
        return task;
    }

    @Override
    public boolean validateTask(String[] validateParam) {
        LambdaQueryWrapper<Task> taskQuery = new LambdaQueryWrapper<>();
        taskQuery.in(Task::getTaskStatus, TaskStatusEnum.QUEUE.getStatus(), TaskStatusEnum.EXECUTING.getStatus())
                .eq(Task::getTaskType, TaskTypeEnum.ADD_OSD.getType());
        List<Task> list = taskService.list(taskQuery);

        for (Task task : list) {
            if (task.getTaskMessage().contains(validateParam[0]) && task.getTaskMessage().contains(validateParam[1])) {
                return false;
            }
        }
        return true;
    }

    private Step createOsd(NodeDeviceManageDto nodeDeviceManageDto, Step step) {
        String nodeName = nodeDeviceManageDto.getHost();
        String devicePath = nodeDeviceManageDto.getPath();
        //update step status to executing
        step.setStepStatus(TaskStatusEnum.EXECUTING.getStatus());
        try {
            RemoteResponse stopOsdResponse = nodeApi.addOsd(RemoteCallUtil.generateRemoteRequest(), nodeName, devicePath);
            boolean flag = true;
            while (flag) {
                List<String> successMsg = List.of(String.format(OsdAddRequest.ADD_OSD_SUCCESS));
                step = stepService.getStepResponse(step, stopOsdResponse, successMsg, "could not create osd");
                if (Objects.equals(step.getStepStatus(), TaskStatusEnum.FINISH.getStatus()) || Objects.equals(step.getStepStatus(), TaskStatusEnum.FAIL.getStatus())) {
                    flag = false;
                }
            }
        } catch (Throwable e) {
            step.setStepStatus(TaskStatusEnum.FAIL.getStatus());
            step.setStepEndTime(LocalDateTime.now());
            if (ObjectUtils.isEmpty(step.getStepErrorMessage())) {
                step.setStepErrorMessage(e.getMessage());
            }
            throw new TaskException(step.getStepErrorMessage());
        } finally {
            step.setStepEndTime(LocalDateTime.now());
            stepService.updateById(step);
        }
        return step;
    }

    @SneakyThrows
    private Step moveOsd(NodeDeviceManageDto nodeDeviceManageDto, Step step) {
        String hostName = nodeDeviceManageDto.getHost();
        int osdId = nodeDeviceManageDto.getOsdId();
        //update step status to executing
        step.setStepStatus(TaskStatusEnum.EXECUTING.getStatus());

        //some prepare options before move osd
        try {
            //create host bucket if not exist
            CrushmapBucket crushmapBuckets = crushmapApi.getCrushmapBuckets(RemoteCallUtil.generateRemoteRequest());
            List<CrushmapBucket.BucketsDTO> buckets = crushmapBuckets.getBuckets();
            CrushmapBucket.BucketsDTO hostBucket = buckets.stream().filter(bucket -> Objects.equals(bucket.getName(), hostName)).findFirst().orElse(null);
            if (Objects.isNull(hostBucket)) {
                RemoteResponse remoteResponse = crushmapApi.createBucket(RemoteCallUtil.generateRemoteRequest(), hostName, CrushFailureDomainEnum.HOST.getTypeId(), CrushmapConst.DEFAULT_ROOT_BUCKET);
                getUpdateCrushmapResponse(remoteResponse, String.format(CreateBucketRequest.CREATE_HOST_BUCKET_SUCCESS, hostName, CrushmapConst.DEFAULT_ROOT_BUCKET), ResultCode.NODE_MOVE_OSD_TO_DEFAULT_ERROR.getMessage());
            }

            //wait the osd created and join in cluster
            boolean isAdded = false;
            while (!isAdded) {
                List<OSDStatusResult> osdStatus = osdApi.listOsdStatus(RemoteCallUtil.generateRemoteRequest());
                OSDStatusResult osd = osdStatus.stream().filter(item -> Objects.equals(item.getOsd(), osdId) && item.getState().contains(OdsStatusEnum.EXISTS.getStatus()) && item.getState().contains(OdsStatusEnum.UP.getStatus()) && item.getWeight() > 0).findFirst().orElse(null);
                if (!Objects.isNull(osd)) {
                    isAdded = true;
                }
            }
        } catch (Throwable e) {
            step.setStepStatus(TaskStatusEnum.FAIL.getStatus());
            step.setStepEndTime(LocalDateTime.now());
            if (ObjectUtils.isEmpty(step.getStepErrorMessage())) {
                step.setStepErrorMessage(e.getMessage());
            }
            throw new TaskException(step.getStepErrorMessage());
        }
        for (int i = 1; i <= UpdateOsdBucketRequest.RETRY_TIMES; i++) {
            sleep(Duration.ofSeconds(UpdateOsdBucketRequest.RETRY_TIME_SECONDS));
            try {
                step.setStepStatus(TaskStatusEnum.EXECUTING.getStatus());
                //move osd to its host bucket
                RemoteResponse moveOsdResponse = osdApi.updateOsdBucketRequest(RemoteCallUtil.generateRemoteRequest(), osdId, CrushFailureDomainEnum.HOST, hostName);
                boolean flag = true;
                while (flag) {
                    List<String> successMsg = List.of(String.format(UpdateOsdBucketRequest.UPDATE_SUCCESS, osdId, osdId, hostName));
                    step = stepService.getStepResponse(step, moveOsdResponse, successMsg, "could not move osd to its default location in crushmap");
                    if (Objects.equals(step.getStepStatus(), TaskStatusEnum.FINISH.getStatus()) || Objects.equals(step.getStepStatus(), TaskStatusEnum.FAIL.getStatus())) {
                        flag = false;
                    }
                }
                if (Objects.equals(step.getStepStatus(), TaskStatusEnum.FINISH.getStatus())) {
                    break;
                }
            } catch (Throwable e) {
                log.warn("move osd failed,start retry,retry times:{}", i);
                if (i == UpdateOsdBucketRequest.RETRY_TIMES) {
                    step.setStepStatus(TaskStatusEnum.FAIL.getStatus());
                    step.setStepEndTime(LocalDateTime.now());
                    if (ObjectUtils.isEmpty(step.getStepErrorMessage())) {
                        step.setStepErrorMessage(e.getMessage());
                    }
                    throw new TaskException(step.getStepErrorMessage());
                }
            } finally {
                step.setStepEndTime(LocalDateTime.now());
                stepService.updateById(step);
            }
        }
        return step;
    }

    public void getUpdateCrushmapResponse(RemoteResponse executeResponse, String require, String errMsg) throws Throwable {
        RemoteResponse resultResponse = crushmapApi.getUpdateCrushmapResult(RemoteCallUtil.generateRemoteRequest(), executeResponse.getId());
        //analytic response result
        if (Objects.equals(resultResponse.getState(), RemoteResponseStatusEnum.SUCCESS.getMessage())) {
            List<FinishedDetail> finished = resultResponse.getFinished();
            String outs = finished.get(0).getOuts();
            String outb = finished.get(0).getOutb();
            String out = StringUtils.hasText(outs) ? outs : outb;
            if (!(Objects.equals(require, out))) {
                throw new TaskException(errMsg + ":" + out);
            }
        } else if (Objects.equals(resultResponse.getState(), RemoteResponseStatusEnum.FAILED.getMessage())) {
            List<FailedDetail> failed = resultResponse.getFailed();
            String outs = failed.get(0).getOuts();
            String outb = failed.get(0).getOutb();
            String out = StringUtils.hasText(outs) ? outs : outb;
            throw new TaskException(errMsg + "," + out);
        } else {
            throw new TaskException("unknown request state,response:" + JSON.toJSONString(resultResponse));
        }
    }

    @Override
    public Task rollback(Task task) {
        String taskParam = task.getTaskParam();
        if (ObjectUtils.isEmpty(taskParam)) {
            task.setTaskEndTime(LocalDateTime.now());
            task.setTaskStatus(TaskStatusEnum.FAIL.getStatus());
            return task;
        }

        NodeDeviceManageDto nodeDeviceManageDto = JSONUtil.toBean(taskParam, NodeDeviceManageDto.class);
        Integer taskId = task.getId();
        Integer osdId = nodeDeviceManageDto.getOsdId();
        try {
            //1.stop and purge osd if it is exists
            List<OSDStatusResult> osds = osdApi.listOsdStatus(RemoteCallUtil.generateRemoteRequest());
            OSDStatusResult osd = osds.stream().filter(item -> Objects.equals(item.getOsd(), osdId)).findFirst().orElse(null);
            if (!Objects.isNull(osd)) {
                Step stopStep = removeOsdTask.stopOsd(osdId, new Step());
                Step purgeStep = removeOsdTask.purgeOsd(osdId, new Step());
            }
            //2.zap the device of osd
            Step zapStep = removeOsdTask.zapOsd(nodeDeviceManageDto, new Step(), task.getTaskName());
        } catch (Throwable e) {
            log.error("add osd error,and can not rollback:reason{}", e.getMessage(), e);
            task.setTaskStatus(TaskStatusEnum.ROLLBACK.getStatus());
            return task;
        } finally {
            task.setTaskEndTime(LocalDateTime.now());
        }
        task.setTaskStatus(TaskStatusEnum.ROLLBACK_SUCCESS.getStatus());
        return task;
    }
}
